/*
 * storage : structured storage of application data
 *
 *   use DEFINE_STORAGE_GROUP(G, C, QoS, T) to create storage group / transaction context
 *   use DECLARE_STORAGE_GROUP(G) to forward-declare the storage group G (suitable for declaration in program headers
 *   use HSTORE(G,N,V0,V1,...) to record the data V0,V1,... with the name N in the group G
 *   use HLOG  (G,N,"text display",V0,V1,...) to record the data V0,V1,... with the name N in the group G (with the display hint "text display" to reconstruct text)
 *
 */

#ifndef HSTORE_H_INCLUDED
#define HSTORE_H_INCLUDED

#include <array>
#include <atomic>
#include <cassert>
#include <cstdint>
#include <cstdlib>
#include <cstring>
#include <functional>
#include <iostream>
#include <map>
#include <mutex>
#include <sstream>
#include <stdexcept>
#include <string>
#include <tuple>
#include <vector>

#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <sys/un.h>
#include <sys/socket.h>
#include <sys/ioctl.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <netdb.h>
#include <fcntl.h>

// types with static reflection info (for reflective structs, variants, etc)
#include "reflect.H"

namespace hobbes { namespace storage { namespace internal {

namespace spin {

#if defined(CLOCK_REALTIME)
inline long poll_tickNS() {
  timespec ts;
  if (clock_gettime(CLOCK_MONOTONIC, &ts) == 0) {
    return ts.tv_sec * 1000000000L + ts.tv_nsec;
  } else {
    return 0;
  }
}
#else
inline long poll_tickNS() {
  struct timeval t;
  if (gettimeofday(&t, 0) == 0) {
    return ((t.tv_sec*1000000)+t.tv_usec)*1000;
  } else {
    return 0;
  }
}
#endif

static inline void wakeN(volatile uint32_t*, int) {
  // assume the waiter is polling, nothing to do
}

} // namespace spin
}}} // namespace hobbes { namespace storage { namespace internal

// a few things have to be OS-specific here
//  * available waiting strategy in shared memory
//  * default waiting strategy
#if defined(__APPLE__) && defined(__MACH__)
// macOS doesn't support this flag to mmap so we can just 0 it out
#ifndef MAP_POPULATE
#define MAP_POPULATE 0
#endif

namespace hobbes { namespace storage { namespace internal {

namespace spin {

static inline void waitForUpdate(volatile uint32_t* p, int eqV) {
  while (true) {
    for (size_t c = 0; c < 4096; ++c) {
      if (*p != eqV) {
        return;
      }
    }
    usleep(500);
  }
}
static inline void waitForUpdate(volatile uint32_t* p, int eqV, size_t timeoutNS, const std::function<void()>& timeoutF) {
  if (timeoutNS == 0) {
    waitForUpdate(p, eqV);
    return;
  }

  long t0 = poll_tickNS();

  while (true) {
    for (size_t c = 0; c < 4096; ++c) {
      if (*p != eqV) {
        return;
      }
    }

    long t1 = poll_tickNS();
    if ((t1-t0) >= timeoutNS) {
      timeoutF();
      t0 = t1;
    }
    usleep(500);
  }
}

} // namespace spin

namespace platform = spin;

}}} // namespace hobbes { namespace storage { namespace internal

#else

#include <linux/futex.h>
#include <sys/syscall.h>

namespace hobbes { namespace storage { namespace internal {

namespace spin {

static inline void waitForUpdate(volatile uint32_t* p, int eqV) {
  while (true) {
    for (size_t c = 0; c < 4096; ++c) {
      if (*p != uint32_t(eqV)) {
        return;
      }
    }
  }
}
static inline void waitForUpdate(volatile uint32_t* p, int eqV, size_t timeoutNS, const std::function<void()>& timeoutF) {
  if (timeoutNS == 0) {
    waitForUpdate(p, eqV);
    return;
  }

  long t0 = poll_tickNS();

  while (true) {
    for (size_t c = 0; c < 4096; ++c) {
      if (*p != uint32_t(eqV)) {
        return;
      }
    }

    long t1 = poll_tickNS();
    if ((t1-t0) >= long(timeoutNS)) {
      timeoutF();
      t0 = t1;
    }
  }
}
} // namespace spin

namespace futex {

static inline long sys_futex(volatile uint32_t* p, int op, int v, struct timespec* timeout, void* p2, int v2) {
  return syscall(SYS_futex, p, op, v, timeout, p2, v2);
}
static inline void waitForUpdate(volatile uint32_t* p, int eqV) {
  sys_futex(p, FUTEX_WAIT, eqV, nullptr, nullptr, 0);
}
static inline void waitForUpdate(volatile uint32_t* p, int eqV, size_t timeoutNS, const std::function<void()>& timeoutF) {
  if (timeoutNS == 0) {
    waitForUpdate(p, eqV);
    return;
  }

  struct timespec ts;
  ts.tv_sec  = timeoutNS / 1000000000L;
  ts.tv_nsec = timeoutNS % 1000000000L;

  bool c = true;
  while (c) {
    int r = sys_futex(p, FUTEX_WAIT, eqV, &ts, nullptr, 0);
        c = r < 0 && errno == ETIMEDOUT;

    if (c) {
      timeoutF();
    }
  }
}
static inline void wakeN(volatile uint32_t* p, int c) {
  sys_futex(p, FUTEX_WAKE, c, nullptr, nullptr, 0);
}

} // namespace futex

namespace platform = futex;

}}} // namespace hobbes { namespace storage { namespace internal
#endif

namespace hobbes { namespace storage {

using WaitFn = void (*)(volatile uint32_t *, int, size_t, const std::function<void ()> &);
using WakeFn = void (*)(volatile uint32_t *, int);

enum WaitPolicy {
  Platform = 0,
  Spin,
};

static inline WaitFn waitFn(const WaitPolicy s) {
  switch (s) {
    case Spin: return &internal::spin::waitForUpdate;
    case Platform:
    default: return &internal::platform::waitForUpdate;
  }
}

static inline WakeFn wakeFn(const WaitPolicy s) {
  switch (s) {
    case Spin: return &internal::spin::wakeN;
    case Platform:
    default: return &internal::platform::wakeN;
  }
}

}}

namespace hobbes { namespace storage {

#define HSTORE_VERSION static_cast<uint32_t>(0x00020000)

using bytes = std::vector<uint8_t>;

// write transactions into shared memory
#define PRIV_HSTORE_LIKELY(x)   __builtin_expect((x),1)
#define PRIV_HSTORE_UNLIKELY(x) __builtin_expect((x),0)

template <typename T>
  T align(T x, T m) {
    if (m == 0 || (x % m) == 0) {
      return x;
    } else {
      return (1 + (x / m)) * m;
    }
  }

#define PRIV_HSTORE_SPIN_MIN 512
#define PRIV_HSTORE_SPIN_MAX 524288

static inline unsigned spin(unsigned count) {
  for (volatile unsigned i = 0; i != count; ++i);
  return (count << 1);
}

#if defined(__aarch64__)
//https://docs.huihoo.com/doxygen/linux/kernel/3.7/arch_2arm64_2include_2asm_2cmpxchg_8h_source.html
#define uxchg(ptr,x) \
        __xchg((x),(ptr),sizeof(*(ptr))) 

static inline void __xchg(volatile unsigned long x, volatile void *ptr, std::size_t size)
{
    unsigned long ret, tmp;

    switch (size) {
    case 1:
        asm volatile("//    __xchg1\n"
        "1: ldaxrb  %w0, [%3]\n"
        "   stlxrb  %w1, %w2, [%3]\n"
        "   cbnz    %w1, 1b\n"
            : "=&r" (ret), "=&r" (tmp)
            : "r" (x), "r" (ptr)
            : "memory", "cc");
        break;
    case 2:
        asm volatile("//    __xchg2\n"
        "1: ldaxrh  %w0, [%3]\n"
        "   stlxrh  %w1, %w2, [%3]\n"
        "   cbnz    %w1, 1b\n"
            : "=&r" (ret), "=&r" (tmp)
            : "r" (x), "r" (ptr)
            : "memory", "cc");
        break;
    case 4:
        asm volatile("//    __xchg4\n"
        "1: ldaxr   %w0, [%3]\n"
        "   stlxr   %w1, %w2, [%3]\n"
        "   cbnz    %w1, 1b\n"
            : "=&r" (ret), "=&r" (tmp)
            : "r" (x), "r" (ptr)
            : "memory", "cc");
        break;
    case 8:
        asm volatile("//    __xchg8\n"
        "1: ldaxr   %0, [%3]\n"
        "   stlxr   %w1, %2, [%3]\n"
        "   cbnz    %w1, 1b\n"
            : "=&r" (ret), "=&r" (tmp)
            : "r" (x), "r" (ptr)
            : "memory", "cc");
        break;
    default:
        assert(0);
    }
}
#else
static inline void uxchg(volatile uint32_t* px, uint32_t nx) {
  __asm__ __volatile__(
    "xchgl %0,%1"
    :"=r" (nx)
    :"m" (*px), "0" (nx)
    :"memory"
  );
}
#endif

#define xchg __sync_lock_test_and_set

// define a local socket for registering new storage queues
inline void mqwrite(int fd, const uint8_t* x, size_t len) {
  size_t i = 0;
  while (i < len) {
    ssize_t c = write(fd, x + i, len - i);
    if (c < 0) {
      if (errno != EINTR) {
        throw std::runtime_error("Couldn't write to socket: " + std::string(strerror(errno)));
      }
    } else {
      i += c;
    }
  }
}

inline void mqread(int fd, uint8_t* x, size_t len) {
  size_t i = 0;
  while (i < len) {
    auto ret = read(fd , x + i, len - i);
    if (ret < 0 && errno != EINTR) {
      throw std::runtime_error("Couldn't read from socket: " + std::string(strerror(errno)));
    } else {
      i += ret;
    }
  }
}

inline int mqconnect(const std::string& fileName) {
  int r = socket(AF_UNIX, SOCK_STREAM, 0);
  if (r == -1) {
    throw std::runtime_error("Unable to allocate socket: " + std::string(strerror(errno)));
  }
  
  sockaddr_un addr;
  memset(&addr, 0, sizeof(addr));
  addr.sun_family = AF_UNIX;
  snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", fileName.c_str());
 
  auto* saddr = reinterpret_cast<sockaddr*>(&addr);
  size_t    len   = sizeof(addr);
  if (connect(r, saddr, len) == -1) {
    std::string emsg = "Unable to connect socket: " + std::string(strerror(errno));
    close(r);
    throw std::runtime_error(emsg);
  }

  fd_set wd;
  FD_ZERO(&wd);
  FD_SET(r, &wd);
  if (select(r + 1, nullptr, &wd, nullptr, nullptr) == -1) {
    std::string emsg = "Failed to connect socket while waiting for writeability: " + std::string(strerror(errno));
    close(r);
    throw std::runtime_error(emsg);
  }

  auto version = HSTORE_VERSION;
  mqwrite(r, reinterpret_cast<const uint8_t*>(&version), sizeof(version));
  return r;
}

inline int mqlisten(const std::string& fileName) {
  int s = socket(AF_UNIX, SOCK_STREAM, 0);
  if (s == -1) {
    throw std::runtime_error("Unable to allocate socket: " + std::string(strerror(errno)));
  }

  sockaddr_un addr;
  if (fileName.size() > sizeof(addr.sun_path) - 1) {
    throw std::runtime_error("Domain socket pathname too long: " + fileName);
  }

  memset(&addr, 0, sizeof(addr));
  addr.sun_family = AF_UNIX;
  unlink(fileName.c_str());
  snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", fileName.c_str());

  if (bind(s, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) == -1) {
    std::string emsg = "Unable to bind socket to file: " + fileName + " (" + std::string(strerror(errno)) + ")";
    close(s);
    throw std::runtime_error(emsg);
  }

  // and then start to listen
  if (listen(s, SOMAXCONN) == -1) {
    std::string emsg = "Unable to listen socket on file: " + fileName + " (" + std::string(strerror(errno)) + ")";
    close(s);
    throw std::runtime_error(emsg);
  }

  return s;
}

inline std::string defaultStoreDir() {
  const char* td = nullptr;
  if ((td = ::getenv("HOBBES_STORE_DIR")) != nullptr) {
    if (::strlen(td) > 0 && ::access(td, W_OK) == 0) {
      return std::string(td);
    }
  }
  if ((td = ::getenv("TMPDIR")) != nullptr) {
    if (::strlen(td) > 0 && ::access(td, W_OK) == 0) {
      return std::string(td);
    }
  }
  if ((td = ::getenv("TMP")) != nullptr) {
    if (::strlen(td) > 0 && ::access(td, W_OK) == 0) {
      return std::string(td);
    }
  }
  if (::access("/var/tmp", W_OK) == 0) {
    return "/var/tmp";
  }
  return "/tmp";
}

inline int connectGroupHost(const std::string& groupName, const std::string& sdir = defaultStoreDir()) {
  auto sp = sdir + "/hstore." + groupName + ".sk";
  try {
    return mqconnect(sp);
  } catch (std::exception& e) {
    throw std::runtime_error("Failed to connect to log consumer for group '" + groupName + "' on socket '" + sp + "' (" + std::string(e.what()) + ")");
  }
}

inline int makeGroupHost(const std::string& groupName, const std::string& sdir = defaultStoreDir()) {
  auto sp = sdir + "/hstore." + groupName + ".sk";
  try {
    return mqlisten(sp);
  } catch (std::exception& e) {
    throw std::runtime_error("Failed to listen for log connections for group '" + groupName + "' on socket '" + sp + "' (" + std::string(e.what()) + ")");
  }
}

// identify this process/thread
using ProcThread = std::pair<uint64_t, uint64_t>;

inline ProcThread thisProcThread() {
  ProcThread r;
  r.first = static_cast<uint64_t>(getpid());
#if defined(__APPLE__) && defined(__MACH__)
  pthread_threadid_np(0, &r.second);
#else
  r.second = static_cast<uint64_t>(syscall(SYS_gettid));
#endif
  return r;
}

// derive a name for a shared memory region with this group name in this thread/process
inline std::string sharedMemName(const std::string& groupName, const ProcThread& pt) {
  std::ostringstream ss;
  ss << "/" << groupName << "." << pt.first << "." << pt.second;
  return ss.str();
}
inline std::string sharedMemName(const std::string& groupName) {
  return sharedMemName(groupName, thisProcThread());
}

// register the allocation of a shared memory region with this group name in this thread/process
inline void registerSHMAlloc(int* mqserver, const char* groupName, const WaitPolicy wp, bool reconnect = false, ProcThread pt = thisProcThread()) {
  if (*mqserver < 0) {
    *mqserver = connectGroupHost(groupName);
  }

  uint8_t msg[1+sizeof(ProcThread)] = {0};
  uint8_t& cmd = msg[0];
  cmd |= reconnect ? 1 : 0;
  cmd |= static_cast<uint8_t>(wp) << 1;
  memcpy(msg+1, &pt, sizeof(ProcThread));
  mqwrite(*mqserver, msg, sizeof(msg));
}

inline void reconnectSHM(int* mqserver, const char* groupName, const WaitPolicy wp, const std::vector<ProcThread>& pts) {
  // has the other side disconnected?
  if (*mqserver >= 0) {
    fd_set rd;
    FD_ZERO(&rd);
    FD_SET(*mqserver, &rd);

    timeval tmout;
    tmout.tv_sec = 0;
    tmout.tv_usec = 1;

    auto sr = select(*mqserver+1, &rd, nullptr, nullptr, &tmout);
    if (sr == 1) {
      char b=0;
      if (recv(*mqserver, &b, 1, MSG_PEEK) == 0) {
        close(*mqserver);
        *mqserver = -1;
      }
    }
  }

  // if we're in a disconnected state, try to reconnect and re-register all buffers
  if (*mqserver < 0) {
    try {
      for (const auto& pt : pts) {
        registerSHMAlloc(mqserver, groupName, wp, true, pt);
      }
    } catch (std::exception& ex) {
    }
  }
}

// between queue readers and writers, there are only three states of concern:
//   0: both reader and writer are making progress
//   1: the reader is blocked waiting for new values (empty queue)
//   2: the writer is blocked waiting for the reader to catch up (full queue)
#define PRIV_HSTORE_STATE_UNBLOCKED      0
#define PRIV_HSTORE_STATE_READER_WAITING 1
#define PRIV_HSTORE_STATE_WRITER_WAITING 2

struct pqueue_config {
  pqueue_config() :
    valuesz(0), count(0), wstate(nullptr), readerIndex(nullptr), writerIndex(nullptr), data(nullptr) {
  }
  
  pqueue_config(size_t valuesz, size_t count, uint32_t* wstate, uint32_t* ri, uint32_t* wi, uint8_t* data) :
    valuesz(valuesz), count(count), wstate(wstate), readerIndex(ri), writerIndex(wi), data(data)
  {
  }

  size_t             valuesz;     // how large is one "value" or queue element?
  size_t             count;       // how many "values" (of size 'valuesz') are there indexable from 'data'?
  volatile uint32_t* wstate;      // inter-process wait state : 0=no waiting, 1=reader waiting, 2=writer waiting
  volatile uint32_t* readerIndex; // where is the reader in the data sequence?
  volatile uint32_t* writerIndex; // where is the writer in the data sequence?
  uint8_t*           data;        // the actual queue data
};

// shared memory queue data
struct ShQueueHeader {
  uint32_t ready;  // set to 1 when the queue has been fully constructed and is ready to read
  size_t   valsz;  // the size of a single "queue value"
  size_t   count;  // the number of queue values defined in the queue
  size_t   metasz; // the size of the following meta-data section
};

struct ShQueueData {
  uint32_t wstate;
  uint32_t ri;
  uint32_t wi;
  uint32_t unused;
};

// write data into shared memory
class writer {
private:
  std::string   shmname;
  int           shmfd;
  pqueue_config cfg;
  WaitFn        waitFn;
  WakeFn        wakeFn;

  inline volatile uint32_t* waitState()                     const { return this->cfg.wstate; }
  inline volatile uint32_t* readIndex()                     const { return this->cfg.readerIndex; }
  inline volatile uint32_t* writeIndex()                    const { return this->cfg.writerIndex; }
  inline uint8_t*           value(size_t i)                 const { return this->cfg.data + (i*this->cfg.valuesz); }
  inline uint32_t           nextIndex(volatile uint32_t* i) const { return (*i + 1) % this->cfg.count; }
public:
  writer(const bytes& meta, const std::string& shmname, size_t qvalsz, size_t count, const WaitPolicy wp) : waitFn(hobbes::storage::waitFn(wp)), wakeFn(hobbes::storage::wakeFn(wp)) {
    shm_unlink(shmname.c_str());

    // sections of shared memory should be aligned to page boundaries
    long pagesz = sysconf(_SC_PAGESIZE);
    if (pagesz == -1) {
      throw std::runtime_error("Failed to query system page size for '" + shmname + "': " + strerror(errno));
    }
  
    // create the shared memory region
    int shfd = shm_open(shmname.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
    if (shfd == -1) {
      throw std::runtime_error("Failed to allocate shared memory for '" + shmname + "': " + strerror(errno));
    }
  
    // our meta-data section comes first up to the first page boundary
    // then our data section comes next
    auto metaLen = align<size_t>(sizeof(ShQueueHeader) + meta.size(),  pagesz);
    auto dataLen = align<size_t>(sizeof(ShQueueData)   + qvalsz*count, pagesz);
    size_t memLen  = metaLen + dataLen;
  
    // allocate this much data
    if (ftruncate(shfd, memLen) == -1) {
      throw std::runtime_error("Failed to truncate shared memory for '" + shmname + "': " + strerror(errno));
    }
  
    auto* mem = reinterpret_cast<uint8_t*>(mmap(nullptr, memLen, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, shfd, 0));
    if (mem == MAP_FAILED) {
      throw std::runtime_error("Failed to map bytes out of shared memory for '" + shmname + "': " + strerror(errno));
    }
  
    // write meta data
    auto* hdr = reinterpret_cast<ShQueueHeader*>(mem);
    hdr->valsz  = qvalsz;
    hdr->count  = count;
    hdr->metasz = meta.size();
    memcpy(mem + sizeof(ShQueueHeader), &meta[0], meta.size());
  
    // OK, this queue is fully initialized
    uxchg(&hdr->ready, 1);
  
    // now make this pqueue config
    auto* sqd = reinterpret_cast<ShQueueData*>(mem + metaLen);
  
    this->shmname = shmname;
    this->shmfd   = shfd;
    this->cfg.valuesz     = qvalsz;
    this->cfg.count       = count;
    this->cfg.wstate      = &sqd->wstate;
    this->cfg.readerIndex = &sqd->ri;
    this->cfg.writerIndex = &sqd->wi;
    this->cfg.data        = mem + metaLen + sizeof(ShQueueData);
  }

  ~writer() {
    shm_unlink(this->shmname.c_str());
  }

  inline const pqueue_config& config() const { return this->cfg; }

  uint8_t* next(size_t timeoutNS = 0, const std::function<void()>& timeoutF = [](){}) {
    uint32_t nwi = nextIndex(writeIndex());

    unsigned count = PRIV_HSTORE_SPIN_MIN;
  
    while (PRIV_HSTORE_UNLIKELY(*readIndex() == nwi)) {
      if (count < PRIV_HSTORE_SPIN_MAX) {
        // back-off the writer
        count = spin(count);
      } else {
        // the reader is behind and we've caught up with it, switch into writer-wait mode
        switch (xchg(waitState(), PRIV_HSTORE_STATE_WRITER_WAITING)) {
          case PRIV_HSTORE_STATE_UNBLOCKED:
            // we previously were unblocked
            // make sure that we still need to block the writer (in case the read index moved while we were getting here)
            // then block while we're in writer-wait state
            if (*readIndex() == nwi) {
              (*waitFn)(waitState(), PRIV_HSTORE_STATE_WRITER_WAITING, timeoutNS, timeoutF);
            }
            break;
          case PRIV_HSTORE_STATE_READER_WAITING:
            // we previously were in reader-wait state (this should practically never happen)
            // since we wait to write anyway, unblock the reader and try again
            uxchg(waitState(), PRIV_HSTORE_STATE_UNBLOCKED);
            (*wakeFn)(waitState(), 1);
            break;
        }
      }
    }
    return value(*writeIndex());
  }
  
  uint8_t* pollNext() {
    if (PRIV_HSTORE_UNLIKELY(*readIndex() == nextIndex(writeIndex()))) {
      return nullptr;
    } else {
      return value(*writeIndex());
    }
  }

  void push() {
    uxchg(writeIndex(), nextIndex(writeIndex()));
  
    // when the writer advances, the reader can be unblocked
    if (PRIV_HSTORE_UNLIKELY(xchg(waitState(), PRIV_HSTORE_STATE_UNBLOCKED) == PRIV_HSTORE_STATE_READER_WAITING)) {
      (*wakeFn)(waitState(), 1);
    }
  }
};

// shared memory pages can be marked as representing four possible conditions:
//   1) page is published but state is undetermined (contingent on being able to acquire another page without blocking)
//   2) page is a continuation of its transaction (intermediate in the total transaction)
//   3) page successfully terminates the transaction (commit)
//   4) page prematurely terminates the transaction (rollback)
#define PRIV_HSTORE_PAGE_STATE_TENTATIVE static_cast<uint8_t>(0)
#define PRIV_HSTORE_PAGE_STATE_CONT      static_cast<uint8_t>(1)
#define PRIV_HSTORE_PAGE_STATE_COMMIT    static_cast<uint8_t>(2)
#define PRIV_HSTORE_PAGE_STATE_ROLLBACK  static_cast<uint8_t>(3)

// writers may record data 'unreliably' (non-blocking) or 'reliably' (block iff waiting for consumer to catch up)
enum PipeQOS {
  Reliable = 0,
  Unreliable
};

// wpipe : a "write pipe" (to write an 'arbitrary-length' sequence of data) on top of writers
class wpipe {
private:
  writer*               wq;
  uint8_t*              page;
  size_t                pagesz;
  uint32_t              offset;
  PipeQOS               qos;
  size_t                timeoutNS;
  std::function<void()> timeoutF;
  size_t                unrtimeNS;

  void markPage(uint8_t c) {
    *reinterpret_cast<uint32_t*>(this->page + this->pagesz) = (static_cast<uint32_t>(c) << 24) | this->offset;
  }

  inline bool reliable() const {
    return this->qos == Reliable;
  }

  bool stepPage() {
    if (reliable()) {
      markPage(PRIV_HSTORE_PAGE_STATE_CONT);
      this->wq->push();

      this->page   = this->wq->next(this->timeoutNS, this->timeoutF);
      this->offset = 0;
      return true;
    } else {
      markPage(PRIV_HSTORE_PAGE_STATE_TENTATIVE);
      this->wq->push();
      uint8_t* npage = this->wq->pollNext();
      markPage(npage != nullptr ? PRIV_HSTORE_PAGE_STATE_CONT : PRIV_HSTORE_PAGE_STATE_ROLLBACK);

      this->page   = npage;
      this->offset = 0;
      return npage != nullptr;
    }
  }
public:
  wpipe(writer* wq, PipeQOS qos = Reliable, size_t timeoutNS = 0, const std::function<void()>& timeoutF = [](){}) : wq(wq), pagesz(wq->config().valuesz - sizeof(uint32_t)), offset(0), qos(qos), timeoutNS(timeoutNS), timeoutF(timeoutF), unrtimeNS(0) {
    if (wq->config().valuesz <= sizeof(uint32_t)) {
      throw std::runtime_error("queue page size too small for use as shared memory pipe");
    }
    this->page = wq->pollNext();
  }

  void commit() {
    if (reliable()) {
      markPage(PRIV_HSTORE_PAGE_STATE_COMMIT);
      this->wq->push();
      this->page = this->wq->next(this->timeoutNS, this->timeoutF);
    } else {
      if (PRIV_HSTORE_LIKELY(this->page != nullptr)) {
        markPage(PRIV_HSTORE_PAGE_STATE_COMMIT);
        this->wq->push();
      }
      this->page = this->wq->pollNext();

      // allow unreliable producers to process timeout events
      if (PRIV_HSTORE_UNLIKELY(this->page == nullptr)) {
        if (this->unrtimeNS == 0) {
          this->unrtimeNS = internal::spin::poll_tickNS();
        } else if (internal::spin::poll_tickNS() - this->unrtimeNS >= this->timeoutNS) {
          this->timeoutF();
          this->unrtimeNS = 0;
        }
      }
    }
    this->offset = 0;
  }

  void rollback() {
    if (PRIV_HSTORE_LIKELY(this->page != nullptr)) {
      markPage(PRIV_HSTORE_PAGE_STATE_ROLLBACK);
      this->wq->push();
    }
    this->page   = reliable() ? this->wq->next(this->timeoutNS, this->timeoutF) : this->wq->pollNext();
    this->offset = 0;
  }

  bool hasSpaceFor(size_t sz) const {
    return (this->page != nullptr) && sz < (this->pagesz - this->offset);
  }

  // write a block of bytes within a frame
  bool write(const uint8_t* src, size_t sz) {
    // just for unreliable pipes, we might enter here without a page
    if (PRIV_HSTORE_UNLIKELY(!this->page)) {
      return false;
    }

    size_t remsz = this->pagesz - this->offset;

    // most of the time we'll be writing small chunks of data
    if (PRIV_HSTORE_LIKELY(sz < remsz)) {
      memcpy(this->page + this->offset, src, sz);
      this->offset += sz;
      return true;
    }

    // now we might cross any number of pages
    // write the src prefix to the remaining page space
    // write all intermediate complete pages
    // write the src suffix to the last page
    memcpy(this->page + this->offset, src, remsz);
    this->offset += remsz;

    size_t so = remsz;
    while (stepPage() && sz - so >= this->pagesz) {
      memcpy(this->page, src + so, this->pagesz);
      this->offset = this->pagesz;
      so += this->pagesz;
    }
    if (this->page == nullptr) {
      return false;
    } else {
      if (so < sz) {
        this->offset = sz - so;
        memcpy(this->page, src + so, this->offset);
      }
      return true;
    }
  }
};

struct QueueConnection {
  int         shfd;
  uint8_t*    data;
  size_t      datasz;
  size_t      pagesz;
  std::string shmname;
};

inline QueueConnection consumeQueue(const std::string& shmname) {
  // sections of shared memory are aligned to page boundaries
  long pagesz = sysconf(_SC_PAGESIZE);
  if (pagesz == -1) {
    throw std::runtime_error("Failed to query system page size for '" + shmname + "': " + strerror(errno));
  }

  // see if we can open this shared memory region
  int shfd = shm_open(shmname.c_str(), O_RDWR, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
  if (shfd == -1) {
    throw std::runtime_error("Failed to open shared memory for '" + shmname + "': " + strerror(errno));
  }

  // make sure that the region has been sized
  struct stat msb;
  if (fstat(shfd, &msb) < 0) {
    close(shfd);
    throw std::runtime_error("Failed to stat shared memory for '" + shmname + "': " + strerror(errno));
  }
  if (msb.st_size <= 0) {
    close(shfd);
    throw std::runtime_error("Shared memory for '" + shmname + "' is not ready");
  }

  // map memory for this data, ensure it is in a good state
  auto* mem = reinterpret_cast<uint8_t*>(mmap(nullptr, msb.st_size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, shfd, 0));
  if (mem == MAP_FAILED) {
    close(shfd);
    throw std::runtime_error("Failed to map bytes out of shared memory for '" + shmname + "': " + strerror(errno));
  }

  if (reinterpret_cast<ShQueueHeader*>(mem)->ready == 0u) {
    munmap(mem, msb.st_size);
    close(shfd);
    throw std::runtime_error("Not ready to consume shared memory for '" + shmname + "'");
  }

  QueueConnection c;
  c.shfd    = shfd;
  c.data    = mem;
  c.datasz  = msb.st_size;
  c.pagesz  = pagesz;
  c.shmname = shmname;
  return c;
}

inline QueueConnection consumeGroup(const std::string& gname, const ProcThread& pt) {
  return consumeQueue(sharedMemName(gname, pt));
}

// read data out of shared memory
class reader {
private:
  int            shfd;
  const uint8_t* metad;
  size_t         metasz;
  pqueue_config  cfg;
  WaitFn         waitFn;
  WakeFn         wakeFn;

  inline volatile uint32_t* waitState()                     const { return this->cfg.wstate; }
  inline volatile uint32_t* readIndex()                     const { return this->cfg.readerIndex; }
  inline volatile uint32_t* writeIndex()                    const { return this->cfg.writerIndex; }
  inline uint8_t*           value(size_t i)                 const { return this->cfg.data + (i*this->cfg.valuesz); }
  inline uint32_t           nextIndex(volatile uint32_t* i) const { return (*i + 1) % this->cfg.count; }
public:
  reader(const QueueConnection& qc, const WaitPolicy wp) : shfd(qc.shfd), waitFn(hobbes::storage::waitFn(wp)), wakeFn(hobbes::storage::wakeFn(wp)) {
    // prepare to read the queue description
    auto* hdr     = reinterpret_cast<ShQueueHeader*>(qc.data);
    auto         metaLen = align<size_t>(sizeof(ShQueueHeader) + hdr->metasz, qc.pagesz);
    auto*   sqd     = reinterpret_cast<ShQueueData*>(qc.data + metaLen);

    // now we should have enough to read out of this queue
    this->metad           = qc.data + sizeof(ShQueueHeader);
    this->metasz          = hdr->metasz;
    this->cfg.valuesz     = hdr->valsz;
    this->cfg.count       = hdr->count;
    this->cfg.wstate      = &sqd->wstate;
    this->cfg.readerIndex = &sqd->ri;
    this->cfg.writerIndex = &sqd->wi;
    this->cfg.data        = qc.data + metaLen + sizeof(ShQueueData);
  }

  ~reader() {
    close(this->shfd);
  }

  inline const pqueue_config& config() const { return this->cfg; }

  // access queue init data; (null,0) if no data was specified
  using MetaData = std::pair<const uint8_t *, size_t>;
  MetaData meta() const {
    return MetaData(this->metad, this->metasz);
  }

  // get the next value in the queue, blocking if necessary
  uint8_t* next(size_t timeoutNS, const std::function<void()>& timeoutF) {
    uint32_t ri = *readIndex();

    unsigned count = PRIV_HSTORE_SPIN_MIN;
  
    while (PRIV_HSTORE_UNLIKELY(*writeIndex() == ri)) {
      if (count < PRIV_HSTORE_SPIN_MAX) {
        // try back-off the reader
        count = spin(count);
      } else {
        // there's nothing to read, switch into reader-wait mode
        switch (xchg(waitState(), PRIV_HSTORE_STATE_READER_WAITING)) {
          case PRIV_HSTORE_STATE_READER_WAITING:
          // for some reason (e.g. resuming on a shm block which was being operated
          // upon by an writer/reader instance that unnaturally terminated)
          // this reader is in this state but not waiting as in the UNBLOCKED
          // case. Just do UNBLOCKED case to resolve the potential infinite loop.
          case PRIV_HSTORE_STATE_UNBLOCKED:
            // we previously were unblocked
            // make sure that we still need to block the reader (in case the write index moved while we were getting here)
            // then block while we're in reader-wait state
            if (*writeIndex() == ri) {
              (*waitFn)(waitState(), PRIV_HSTORE_STATE_READER_WAITING, timeoutNS, timeoutF);
            }
            break;
          case PRIV_HSTORE_STATE_WRITER_WAITING:
            // we previously were in writer-wait state (this should practically never happen)
            // since we wait to read anyway, unblock the writer and try again
            uxchg(waitState(), PRIV_HSTORE_STATE_UNBLOCKED);
            (*wakeFn)(waitState(), 1);
            break;
        }
      }
    }
    return value(*readIndex());
  }

  // get the next value in the queue if one is present, else null
  uint8_t* pollNext() {
    if (PRIV_HSTORE_UNLIKELY(*writeIndex() == *readIndex())) {
      return nullptr;
    } else {
      return value(*readIndex());
    }
  }

  // remove the next value from the queue (increment the read index)
  void pop() {
    uxchg(readIndex(), nextIndex(readIndex()));
  
    // when the reader advances, the writer can be unblocked
    if (PRIV_HSTORE_UNLIKELY(xchg(waitState(), PRIV_HSTORE_STATE_UNBLOCKED) == PRIV_HSTORE_STATE_WRITER_WAITING)) {
      (*wakeFn)(waitState(), 1);
    }
  }
};

// rpipe : a "read pipe" on top of readers
class rpipe {
private:
  reader* rq;
  const uint8_t* page;
  size_t         pagesz;
  uint32_t       offset;
public:
  rpipe(reader* rq) : rq(rq), page(nullptr), pagesz(rq->config().valuesz - sizeof(uint32_t)), offset(0) {
    if (rq->config().valuesz <= sizeof(uint32_t)) {
      throw std::runtime_error("queue page size too small for use as shared memory pipe");
    }
  }

  // read a range of bytes out of the 'pipe' into a user-supplied buffer
  size_t read(uint8_t* dst, size_t sz, uint8_t* state, size_t timeoutNS, const std::function<void()>& timeoutF) {
    if (this->page == nullptr) {
      this->page = this->rq->next(timeoutNS, timeoutF);
    }
  
    size_t doff = 0;
  
    while (sz != 0 && (this->page != nullptr)) {
      // how much can we copy out of the current page into the dest buffer?
      const volatile uint32_t* pend      = reinterpret_cast<const uint32_t*>(this->page + this->pagesz);
      size_t                   rpagesz   = *pend & ~(0xFF << 24);
      size_t                   availPage = rpagesz - this->offset;
      size_t                   csz       = (sz < availPage) ? sz : availPage;
  
      // copy as much as we can
      memcpy(dst + doff, this->page + this->offset, csz);
  
      // step through this segment we've copied
      this->offset += csz;
      doff         += csz;
      sz           -= csz;
  
      // if we've hit the end of the page, mark it done and then try for the next one
      if (this->offset == rpagesz) {
        // wait for the producer to decide what state the page is in
        while ((*pend >> 24) == PRIV_HSTORE_PAGE_STATE_TENTATIVE);
        uint8_t ps = *pend >> 24;
        if (state != nullptr) *state = ps;

        // if this page is just a continuation, continue reading
        // else we've terminated a transaction
        this->rq->pop();
        this->page   = (ps == PRIV_HSTORE_PAGE_STATE_CONT) ? this->rq->pollNext() : nullptr;
        this->offset = 0;
      }
    }
  
    return doff;
  }
};

/*****************************
 *
 * store<T> : the main interface for type-directed storage into shared memory pipes
 *
 *****************************/
template <typename T, typename P = void>
  struct store {
  };

template <typename T> struct store<const T*> : store<T*> {};

#define PRIV_HSTORE_DEFINE_PRIMTYS(T, n) \
  template <> \
    struct store<T> { \
      static const bool can_memcpy = true; \
      static ty::desc    type()                      { return ty::prim(n); } \
      static size_t      size(const T&)              { return sizeof(T); } \
      static bool        write(wpipe& p, const T& x) { return p.write(reinterpret_cast<const uint8_t*>(&x), sizeof(x)); } \
    }

PRIV_HSTORE_DEFINE_PRIMTYS(bool,     "bool");
PRIV_HSTORE_DEFINE_PRIMTYS(uint8_t,  "byte");
PRIV_HSTORE_DEFINE_PRIMTYS(char,     "char");
PRIV_HSTORE_DEFINE_PRIMTYS(int16_t,  "short");
PRIV_HSTORE_DEFINE_PRIMTYS(uint16_t, "short");
PRIV_HSTORE_DEFINE_PRIMTYS(int32_t,  "int");
PRIV_HSTORE_DEFINE_PRIMTYS(uint32_t, "int");
PRIV_HSTORE_DEFINE_PRIMTYS(int64_t,  "long");
PRIV_HSTORE_DEFINE_PRIMTYS(uint64_t, "long");
#if defined(__APPLE__) && defined(__MACH__)
PRIV_HSTORE_DEFINE_PRIMTYS(size_t, "long");
#endif
PRIV_HSTORE_DEFINE_PRIMTYS(__int128, "int128");
PRIV_HSTORE_DEFINE_PRIMTYS(float,    "float");
PRIV_HSTORE_DEFINE_PRIMTYS(double,   "double");

// store unit
template <>
  struct store<unit> {
    static const bool can_memcpy = false;
    static ty::desc type()                     { return ty::prim("unit"); }
    static size_t   size()                     { return 0; }
    static bool     write(wpipe&, const unit&) { return true; }
  };

// store fixed-length arrays
template <typename T, size_t N>
  struct fixedArrTyDesc { static ty::desc type() { return ty::array(store<T>::type(), ty::nat(N)); } };

template <typename T, size_t N>
  struct store<T[N], typename tbool<store<T>::can_memcpy>::type> : public fixedArrTyDesc<T,N> {
    static const bool can_memcpy = true;
    static size_t size (const T (&)[N])           { return sizeof(T)*N; }
    static bool   write(wpipe& p, const T (&v)[N]) { return p.write(reinterpret_cast<const uint8_t*>(v), sizeof(T)*N); }
  };
template <typename T, size_t N>
  struct store<std::array<T, N>, typename tbool<store<T>::can_memcpy>::type> : public fixedArrTyDesc<T, N> {
    static const bool can_memcpy = true;
    static size_t size (const std::array<T, N>&)             { return sizeof(T)*N; }
    static bool   write(wpipe& p, const std::array<T, N>& x) { return p.write(reinterpret_cast<const uint8_t*>(&x), size(x)); }
  };
template <typename T, size_t N>
  struct store<T[N], typename tbool<!store<T>::can_memcpy>::type> : public fixedArrTyDesc<T,N> {
    static const bool can_memcpy = false;
    static size_t size(const T (&v)[N])            { size_t s = 0; for (size_t i = 0; i < N; ++i) { s += store<T>::size(v[i]); } return s; }
    static bool   write(wpipe& p, const T (&v)[N]) { for (size_t i = 0; i < N; ++i) { if (!store<T>::write(p, v[i])) { return false; } } return true; }
  };
template <typename T, size_t N>
  struct store<std::array<T, N>, typename tbool<!store<T>::can_memcpy>::type> : public fixedArrTyDesc<T, N> {
    static const bool can_memcpy = false;
    static size_t size (const std::array<T, N>& x)           { size_t n = 0; for (size_t i = 0; i < N; ++i) { n += store<T>::size(x[i]); } return n; }
    static bool   write(wpipe& p, const std::array<T, N>& x) { for (size_t i = 0; i < N; ++i) { if (!store<T>::write(p, x[i])) return false; } return true; }
  };

// store strings
template <>
  struct store<const char*> {
    static const bool can_memcpy = false;
    static ty::desc type ()                        { return ty::array(ty::prim("char")); }
    static size_t   size (const char* s)           { return sizeof(size_t) + strlen(s); }
    static bool     write(wpipe& p, const char* s) { size_t n = strlen(s); return store<size_t>::write(p, n) && p.write(reinterpret_cast<const uint8_t*>(s), n); }
  };
template <>
  struct store<char*> {
    static const bool can_memcpy = false;
    static ty::desc type ()                  { return ty::array(ty::prim("char")); }
    static size_t   size (char* s)           { return sizeof(size_t) + strlen(s); }
    static bool     write(wpipe& p, char* s) { size_t n = strlen(s); return store<size_t>::write(p, n) && p.write(reinterpret_cast<const uint8_t*>(s), n); }
  };
template <>
  struct store<std::string> {
    static const bool can_memcpy = false;
    static ty::desc type ()                               { return ty::array(ty::prim("char")); }
    static size_t   size (const std::string& s)           { return sizeof(size_t) + s.size(); }
    static bool     write(wpipe& p, const std::string& s) { size_t n = s.size(); return store<size_t>::write(p, n) && p.write(reinterpret_cast<const uint8_t*>(s.data()), n); }
  };

// support storage of vectors
template <typename T>
  struct store<std::vector<T>, typename tbool<store<T>::can_memcpy>::type> {
    static const bool can_memcpy = false;
    static ty::desc type ()                                   { return ty::array(store<T>::type()); }
    static size_t   size (const std::vector<T>& xs)           { return sizeof(size_t) + (xs.size() * sizeof(T)); }
    static bool     write(wpipe& p, const std::vector<T>& xs) { size_t n = xs.size(); return store<size_t>::write(p, n) && p.write(reinterpret_cast<const uint8_t*>(&xs[0]), n * sizeof(T)); }
  };

template <typename T>
  struct store<std::vector<T>, typename tbool<!store<T>::can_memcpy>::type> {
    static const bool can_memcpy = false;
    static ty::desc type() { return ty::array(store<T>::type()); }
    static size_t size(const std::vector<T>& xs) {
      size_t t = sizeof(size_t);
      for (const auto& x : xs) {
        t += store<T>::size(x);
      }
      return t;
    }
    static bool write(wpipe& p, const std::vector<T>& xs) {
      size_t n = xs.size();
      if (!store<size_t>::write(p, n)) {
        return false;
      }
      for (const auto& x : xs) {
        if (!store<T>::write(p, x)) {
          return false;
        }
      }
      return true;
    }
  };

template <>
  struct store<std::vector<bool>> {
    static const bool can_memcpy = false;
    static ty::desc type() { return ty::array(store<bool>::type()); }
    static size_t size(const std::vector<bool>& xs) { return sizeof(size_t) + (xs.size() * sizeof(bool)); }
    static bool write(wpipe& p, const std::vector<bool>& xs) {
      size_t n = xs.size();
      if (!store<size_t>::write(p, n)) { return false; }
      for (const auto& x : xs) {
        if (!store<bool>::write(p, x)) {
          return false;
        }
      }
      return true;
    }
  };

template <typename T>
  struct store<array<T>*, typename tbool<store<T>::can_memcpy>::type> {
    static const bool can_memcpy = false;
    static ty::desc type() { return ty::array(store<T>::type()); }
    static size_t size(const array<T>* xs) { return sizeof(size_t) + xs->size * sizeof(T); }
    static bool write(wpipe& p, const array<T>* xs) { return p.write(reinterpret_cast<const uint8_t*>(xs), size(xs)); }
  };

template <typename T>
  struct store<array<T>*, typename tbool<!store<T>::can_memcpy>::type> {
    static const bool can_memcpy = false;
    static ty::desc type() { return ty::array(store<T>::type()); }
    static size_t size(const array<T>* xs) {
      size_t size = sizeof(size_t);
      for (const auto & x : xs) {
        size += store<T>::size(x);
      }
      return size;
    }
    static bool write(wpipe& p, const array<T>* xs) {
      if (!store<size_t>::write(p, xs->size)) {
        return false;
      }
      for (const auto & x : xs) {
        if (!store<T>::write(p, x)) {
          return false;
        }
      }
      return true;
    }
  };

/*
 * store tuples (this gets a little complicated)
 */

// a generic method to write tuples as a last resort
template <size_t i, size_t n, typename ... Ts>
  struct storeTupleDef {
    using H = typename nth<i, Ts...>::type;
    using offs = typename tuple<Ts...>::offs;
    using Recurse = storeTupleDef<i + 1, n, Ts...>;

    static void fieldDefs(ty::Struct::Fields* fs) {
      fs->push_back(ty::Struct::Field(".f" + hobbes::string::from(i), static_cast<int>(offsetAt<i, offs>::value), store<H>::type()));
      Recurse::fieldDefs(fs);
    }
    static ty::desc type() { ty::Struct::Fields fs; fieldDefs(&fs); return ty::record(fs); }

    static size_t size(const tuple<Ts...>& x) {
      return store<H>::size(x.template at<i>()) + Recurse::size(x);
    }

    static bool incrWrite(wpipe& p, const tuple<Ts...>& x) {
      return store<H>::write(p, x.template at<i>()) &&
             Recurse::incrWrite(p, x);
    }
  };
template <size_t n, typename ... Ts>
  struct storeTupleDef<n, n, Ts...> {
    static void fieldDefs(ty::Struct::Fields*)         { }
    static ty::desc type()                             { return ty::prim("unit"); }
    static size_t size(const tuple<Ts...>&)            { return  0; }
    static bool incrWrite(wpipe&, const tuple<Ts...>&) { return true; }
  };

// when we store standard-layout tuples, we can only bulk memcpy them if:
//   * all fields up to the last one are packed (no internal padding, but trailing padding is ok since we can truncate it with memcpy)
//   * all field types can be memcopied
template <typename T, typename P = void>
  struct storeStdLayoutTuple { };

template <typename ... Ts>
  struct all_memcpyableFs { static const bool value = true; };
template <typename T, typename ... Ts>
  struct all_memcpyableFs<T, Ts...> { static const bool value = store<T>::can_memcpy && all_memcpyableFs<Ts...>::value; };

// the whole tuple can be memcopied
//   e.g.: int*int
template <typename ... Ts>
  struct storeStdLayoutTuple<tuple<Ts...>, typename tbool<tuple<Ts...>::packed && all_memcpyableFs<Ts...>::value>::type> {
    static const bool can_memcpy = true;
    static ty::desc type()                                 { return storeTupleDef<0, sizeof...(Ts), Ts...>::type(); }
    static size_t   size(const tuple<Ts...>&)              { return sizeof(tuple<Ts...>); }
    static bool     write(wpipe& p, const tuple<Ts...>& t) { return p.write(reinterpret_cast<const uint8_t*>(&t), sizeof(t)); }
  };

// the tuple prefix can be memcopied but the whole thing can't because it has trailing padding
//   e.g.: double*int
template <typename ... Ts>
  struct storeStdLayoutTuple<tuple<Ts...>, typename tbool<!tuple<Ts...>::packed && tuple<Ts...>::offs::packed && all_memcpyableFs<Ts...>::value>::type> {
    static const bool can_memcpy = false;
    static ty::desc type()                                 { return storeTupleDef<0, sizeof...(Ts), Ts...>::type(); }
    static size_t   size(const tuple<Ts...>&)              { return tuple<Ts...>::offs::size; }
    static bool     write(wpipe& p, const tuple<Ts...>& t) { return p.write(reinterpret_cast<const uint8_t*>(&t), tuple<Ts...>::offs::size); }
  };

// the tuple can't be memcopied
template <typename ... Ts>
  struct storeStdLayoutTuple<tuple<Ts...>, typename tbool<(!tuple<Ts...>::packed && !tuple<Ts...>::offs::packed) || !all_memcpyableFs<Ts...>::value>::type> {
    static const bool can_memcpy = false;
    using Reflect = storeTupleDef<0, sizeof...(Ts), Ts...>;
    static ty::desc type()                                 { return Reflect::type(); }
    static size_t   size(const tuple<Ts...>& t)            { return Reflect::size(t); }
    static bool     write(wpipe& p, const tuple<Ts...>& t) { return Reflect::incrWrite(p, t); }
  };

// many types can be stored "as if" some standard layout tuple
// those types can describe themselves however they would like, then defer to this implementation to actually write and compute size
template <typename R, typename T>
  struct storeAsIfTuple {
    static const bool can_memcpy = storeStdLayoutTuple<T>::can_memcpy;

    static size_t size (const R& r)           { return storeStdLayoutTuple<T>::size(*reinterpret_cast<const T*>(&r)); }
    static bool   write(wpipe& p, const R& r) { return storeStdLayoutTuple<T>::write(p, *reinterpret_cast<const T*>(&r)); }
  };

// store pairs
template <typename U, typename V>
  struct store<std::pair<U,V>> : public storeAsIfTuple<std::pair<U,V>, tuple<U,V>> {
    static ty::desc type() { return storeStdLayoutTuple<tuple<U,V>>::type(); }
  };

// store reflective structs
#define DEFINE_HSTORE_STRUCT(T, FIELDS...) DEFINE_STRUCT(T, FIELDS)

struct defStructF {
  ty::Struct::Fields* fs;
  defStructF(ty::Struct::Fields* fs) : fs(fs) { }
  template <typename T>
    void visit(const char* fname) {
      this->fs->push_back(ty::Struct::Field(fname, -1, store<T>::type()));
    }
};
template <typename T>
  struct store<T, typename tbool<T::is_hmeta_struct>::type> : public storeAsIfTuple<T, typename T::as_tuple_type> {
    static std::string storageName() { return T::_hmeta_struct_type_name(); }

    static ty::desc type() {
      ty::Struct::Fields fs;
      defStructF df(&fs);
      T::meta(df);
      return ty::record(fs);
    }
  };

// this macro is deprecated, but has been used to define a struct to record s.t. it stores via memcpy
// for backward compatibility, assert that a packed struct is packed
// this breaks if users don't declare structs in a way that would be packed without the packed annotation
// but allows all structs that happen to be packed to share the same memcpy logic
#define DEFINE_PACKED_HSTORE_STRUCT(T, FIELDS...) \
  DEFINE_STRUCT(T, FIELDS); \
  static_assert(::hobbes::storage::storeStdLayoutTuple<T::as_tuple_type>::can_memcpy, "declaration of packed struct found inconsistent with internal padding (add explicit fields or re-order fields to eliminate padding)")

// support storing "struct views" (select fields / const accessor functions) out of a type
template <typename T, typename F>
  static F T::* resolve(F T::*);
template <typename T, typename R>
  static auto resolve(R (T::*K)() const) -> decltype(K);

template <typename T> struct member;

template <typename T, typename F>
  struct member<F T::*> {
    using type = F;
    using Acc = F T::*;
    static const F& read(const T& x, Acc a) { return x.*a; }
  };
template <typename T, typename F>
  struct member<F (T::*)() const> {
    using type = F;
    using Acc = F (T::*)() const;
    static F read(const T& x, Acc a) { return (x.*a)(); }
  };

#define PRIV_HSTORE_DSV_T(f)         member<decltype(resolve(&SelfT::f))>::type
#define PRIV_HSTORE_DSV_READ(f)      member<decltype(resolve(&SelfT::f))>::read(x, std::integral_constant<member<decltype(resolve(&SelfT::f))>::Acc, &SelfT::f>::value)
#define PRIV_HSTORE_DSV_DEF_FIELD(f) fs.push_back(ty::Struct::Field(#f, -1, store<PRIV_HSTORE_DSV_T(f)>::type()));
#define PRIV_HSTORE_DSV_SIZE(f)      + store< PRIV_HSTORE_DSV_T(f) >::size(PRIV_HSTORE_DSV_READ(f))
#define PRIV_HSTORE_DSV_WRITE(f)     && store< PRIV_HSTORE_DSV_T(f) >::write(p, PRIV_HSTORE_DSV_READ(f))

#define DEFINE_HSTORE_STRUCT_VIEW(T, FIELDS...) \
  namespace hobbes { namespace storage { \
    template <> \
      struct store<T> { \
        typedef T SelfT; \
        static std::string storageName() { return #T; } \
        static ty::desc type() { ty::Struct::Fields fs; PRIV_HPPF_MAPL(PRIV_HSTORE_DSV_DEF_FIELD, FIELDS); return ty::record(fs); } \
        static size_t size(const T& x) { return 0 PRIV_HPPF_MAPL(PRIV_HSTORE_DSV_SIZE, FIELDS); } \
        static bool write(wpipe& p, const T& x) { return true PRIV_HPPF_MAPL(PRIV_HSTORE_DSV_WRITE, FIELDS); } \
      }; \
    template <> \
      struct store<T*> { \
        typedef T SelfT; \
        static std::string storageName() { return #T; } \
        static ty::desc type() { ty::Struct::Fields fs; PRIV_HPPF_MAPL(PRIV_HSTORE_DSV_DEF_FIELD, FIELDS); return ty::record(fs); } \
        static size_t size(const T* px) { const T& x = *px; return 0 PRIV_HPPF_MAPL(PRIV_HSTORE_DSV_SIZE, FIELDS); } \
        static bool write(wpipe& p, const T* px) { const T& x = *px; return true PRIV_HPPF_MAPL(PRIV_HSTORE_DSV_WRITE, FIELDS); } \
      }; \
    template <> \
      struct store<const T*> { \
        typedef T SelfT; \
        static std::string storageName() { return #T; } \
        static ty::desc type() { ty::Struct::Fields fs; PRIV_HPPF_MAPL(PRIV_HSTORE_DSV_DEF_FIELD, FIELDS); return ty::record(fs); } \
        static size_t size(const T* px) { const T& x = *px; return 0 PRIV_HPPF_MAPL(PRIV_HSTORE_DSV_SIZE, FIELDS); } \
        static bool write(wpipe& p, const T* px) { const T& x = *px; return true PRIV_HPPF_MAPL(PRIV_HSTORE_DSV_WRITE, FIELDS); } \
      }; \
  }}
#define HSTORE_MAKE_FRIEND template <typename _> friend class hobbes::storage::store;

// store std::tuples
// (std::tuples don't have standard layout so we need to store them the hard way)
template <size_t i, size_t e, typename ... Ts>
  struct storeTuple {
    static size_t size(const std::tuple<Ts...>& t) {
      return store<typename std::tuple_element<i, std::tuple<Ts...>>::type>::size(std::get<i>(t)) +
             storeTuple<i+1, e, Ts...>::size(t);
    }
    static bool write(wpipe& p, const std::tuple<Ts...>& t) {
      return store<typename std::tuple_element<i, std::tuple<Ts...>>::type>::write(p, std::get<i>(t)) &&
             storeTuple<i+1, e, Ts...>::write(p, t);
    }
  };
template <size_t e, typename ... Ts>
  struct storeTuple<e, e, Ts...> {
    static size_t size(const std::tuple<Ts...>&) { return 0; }
    static bool write(wpipe&, const std::tuple<Ts...>&) { return true; }
  };
template <typename ... Ts>
  struct store< std::tuple<Ts...> > {
    static ty::desc type() { return storeStdLayoutTuple<tuple<Ts...>>::type(); }

    static size_t size(const std::tuple<Ts...>& t) {
      return storeTuple<0, sizeof...(Ts), Ts...>::size(t);
    }
    static bool write(wpipe& p, const std::tuple<Ts...>& t) {
      return storeTuple<0, sizeof...(Ts), Ts...>::write(p, t);
    }
  };

// store enumerations
template <typename T>
  struct store<T, typename tbool<T::is_hmeta_enum>::type> {
    static const bool can_memcpy = true;
    
    static ty::desc type()                      { return ty::enumdef(store<typename T::rep_t>::type(), T::meta()); }
    static size_t   size(const T&)              { return sizeof(typename T::rep_t); }
    static bool     write(wpipe& p, const T& x) { return p.write(reinterpret_cast<const uint8_t*>(&x), sizeof(x)); }
  };

// store variants
template <size_t i, size_t n, typename ... Ts>
  struct storeVariantDef {
    using H = typename nth<i, Ts...>::type;
    using Recurse = storeVariantDef<i + 1, n, Ts...>;

    static void ctorDefs(ty::Variant::Ctors* cs) {
      cs->push_back(ty::Variant::Ctor(".f" + hobbes::string::from(i), static_cast<int>(i), store<H>::type()));
      Recurse::ctorDefs(cs);
    }
    static ty::desc type() { ty::Variant::Ctors cs; ctorDefs(&cs); return ty::variant(cs); }
  };
template <size_t n, typename ... Ts>
  struct storeVariantDef<n, n, Ts...> {
    static void     ctorDefs(ty::Variant::Ctors*) { }
    static ty::desc type()                        { return ty::prim("void"); }
  };

template <size_t tag, typename T, typename M>
  struct variantGenSize {
    static size_t fn(T* vd, int) {
      return store<T>::size(*vd);
    }
  };
template <size_t tag, typename T, typename M>
  struct variantGenWrite {
    static bool fn(T* vd, wpipe* f) {
      return store<uint32_t>::write(*f, static_cast<uint32_t>(tag)) && store<T>::write(*f, *vd);
    }
  };
template <typename ... Ts>
  struct store<variant<Ts...>> : public storeVariantDef<0, sizeof...(Ts), Ts...> {
    static const bool can_memcpy = false;

    static size_t size(const variant<Ts...>& x) {
      return sizeof(uint32_t) + x.template apply<size_t, variantGenSize, void, int>(0);
    }

    static bool write(wpipe& p, const variant<Ts...>& x) {
      return x.template apply<bool, variantGenWrite, void, wpipe*>(&p);
    }
  };

// store variants with named constructors
struct descVariantF {
  ty::Variant::Ctors* ctors;
  descVariantF(ty::Variant::Ctors* ctors) : ctors(ctors) { }

  template <typename T>
    void ctor(const char* n, int id) {
      this->ctors->push_back(ty::Variant::Ctor(n, id, store<T>::type()));
    }
};

template <typename T>
  struct store<T, typename tbool<T::is_hmeta_variant>::type> {
    using VT = typename T::as_variant_type;
    static const bool can_memcpy = store<VT>::can_memcpy;

    static ty::desc type() {
      ty::Variant::Ctors cs;
      descVariantF f(&cs);
      T::meta(f);
      return ty::variant(cs);
    }

    static size_t size (const T& x)           { return store<VT>::size(*reinterpret_cast<const VT*>(&x)); }
    static bool   write(wpipe& p, const T& x) { return store<VT>::write(p, *reinterpret_cast<const VT*>(&x)); }
  };

// store recursive types
struct recursion {
  void* value;
  recursion(void* x) : value(x) { }
};

template <typename T>
  struct recursive {
    using value_type = T;
    value_type value;
    recursive(const value_type& x) : value(x) { }
  };

template <>
  struct store<recursion> {
    static ty::desc type() {
      return ty::var("x");
    }

    using RecSizeF = size_t (*)(const recursion &);
    static RecSizeF& sizeF() {
      thread_local RecSizeF fn = nullptr;
      return fn;
    }
    static size_t size(const recursion& x) {
      return sizeF()(x);
    }

    using RecWriteF = bool (*)(wpipe &, const recursion &);
    static RecWriteF& writeF() {
      thread_local RecWriteF fn = nullptr;
      return fn;
    }
    static bool write(wpipe& p, const recursion& x) {
      return writeF()(p, x);
    }
  };

template <typename T>
  struct store<recursive<T>> {
    static void encode(bytes*) {
      return ty::recursive("x", store<T>::type());
    }
    static size_t recSize(const recursion& x) {
      return size(*reinterpret_cast<const recursive<T>*>(x.value));
    }
    static size_t size(const recursive<T>& x) {
      using RSF = typename store<recursion>::RecSizeF;
      RSF sf = store<recursion>::sizeF();
      store<recursion>::sizeF() = &store<recursive<T>>::recSize;
      size_t r = store<T>::size(x.value);
      store<recursion>::sizeF() = sf;
      return r;
    }

    static bool recWrite(wpipe& p, const recursion& x) {
      return write(p, *reinterpret_cast<const recursive<T>*>(x.value));
    }
    static bool write(wpipe& p, const recursive<T>& x) {
      using RWF = typename store<recursion>::RecWriteF;
      RWF sf = store<recursion>::writeF();
      store<recursion>::writeF() = &store<recursive<T>>::recWrite;
      bool r = store<T>::write(p, x.value);
      store<recursion>::writeF() = sf;
      return r;
    }
  };

// store opaque type aliases
template <typename T>
  struct store<T, typename tbool<T::is_hmeta_alias>::type> {
    using RT = typename T::type;
    static const bool can_memcpy = store<RT>::can_memcpy;

    static ty::desc type ()                     { return ty::prim(T::name(), store<RT>::type()); ; }
    static size_t   size (const T& x)           { return store<RT>::size(x.value); }
    static bool     write(wpipe& p, const T& x) { return store<RT>::write(p, x.value); }
  };

// support values carrying field names (easier to record structs without making an explicit struct type)
template <typename F, typename T>
  struct namedValue {
    T value;
    namedValue(const T& v) : value(v) { }
  };

template <typename F, typename T>
  struct store<namedValue<F,T>> {
    static ty::desc type ()                                   { return store<T>::type(); }
    static size_t   size (const namedValue<F,T>& x)           { return store<T>::size(x.value); }
    static bool     write(wpipe& p, const namedValue<F,T>& x) { return store<T>::write(p, x.value); }
  };

#define HNAME(N,E) ::hobbes::storage::namedValue<PRIV_HPPF_TSTR(#N),decltype(E)>(E)

// allow inference of log statement names from payload types
using tynamehints = void (*)(std::vector<std::string> *);
template <typename T>
  struct nameAt {
    template <typename U, std::string (*P)()> struct HasStorageName { };
    template <typename U> static std::string is(const HasStorageName<U, &U::storageName>*) { return U::storageName(); }
    template <typename U> static std::string is(...)                                       { return ""; }
  };

template <typename T>
  struct storeNames {
    static void accum(std::vector<std::string>* ns) {
      std::string n = nameAt<T>::template is<store<T>>(0);
      if (!n.empty()) {
        ns->push_back(n);
      }
    }
  };

// make a function for describing store payload types
template <typename T>
  struct hstore_argl_name {
    static const bool isTuple = true;
    static void nameDesc(const char* prefix, size_t idx, char* buffer, size_t buflen) {
      snprintf(buffer, buflen, prefix, static_cast<unsigned long long>(idx));
    }
  };
template <typename F, typename T>
  struct hstore_argl_name<namedValue<F,T>> {
    static const bool isTuple = false;
    static void nameDesc(const char*, size_t, char* buffer, size_t buflen) {
      strncpy(buffer, F::str(), buflen);
    }
  };

template <typename ... Ts>
  struct hstore_payload_types {
    using head_type = unit;
    static const size_t count = 0;
    static const bool isTuple = true;
    static void fields(const char*,size_t,ty::Struct::Fields*) {}
    static void nameHints(std::vector<std::string>*) {}
  };
template <typename T, typename ... Ts>
  struct hstore_payload_types<T,Ts...> {
    using head_type = T;
    static const size_t count = 1 + hstore_payload_types<Ts...>::count;
    static const bool isTuple = hstore_argl_name<T>::isTuple && hstore_payload_types<Ts...>::isTuple;
    static void fields(const char* fmt, size_t f, ty::Struct::Fields* fs) {
      char fn[128];
      hstore_argl_name<T>::nameDesc(fmt, f, fn, sizeof(fn));
      fs->push_back(ty::Struct::Field(fn, -1, store<T>::type()));

      hstore_payload_types<Ts...>::fields(fmt, f+1, fs);
    }
    static void nameHints(std::vector<std::string>* ns) {
      storeNames<T>::accum(ns);
      hstore_payload_types<Ts...>::nameHints(ns);
    }
  };
template <typename ... Ts>
  hstore_payload_types<Ts...> makePayloadTypes(const Ts&...) {
    return hstore_payload_types<Ts...>();
  }

using tydescfn = void (*)(bytes *, std::string *);
template <typename TyList>
  void makeTyDescF(bytes* e, std::string* d) {
    ty::desc td;

    if (!TyList::isTuple || TyList::count > 1) {
      ty::Struct::Fields fs;
      TyList::fields(TyList::isTuple ? ".f%lld" : "field%lld", 0, &fs);
      td = ty::record(fs);
    } else if (TyList::count == 0) {
      td = ty::prim("unit");
    } else {
      td = store<typename TyList::head_type>::type();
    }

    if (e) {
      ty::encode(td, e);
    }
    if (d) {
      *d = ty::show(td);
    }
  }

template <uint32_t* X>
  uint32_t forceRegistration() { return *X; }
template <typename Group, Group* G, typename File, uint32_t Line, typename StmtName, size_t Flags, typename FormatStr, typename ArgTyList>
  struct StorageStatement {
    static uint32_t id;
    StorageStatement() { forceRegistration<&id>(); }
  };
template <typename Group, Group* G, typename File, uint32_t Line, typename StmtName, size_t Flags, typename FormatStr, typename ArgTyList>
  uint32_t StorageStatement<Group, G, File, Line, StmtName, Flags, FormatStr, ArgTyList>::id = G->allocateStorageStatement(File::str(), Line, StmtName::str(), Flags, FormatStr::str(), &makeTyDescF<ArgTyList>, &ArgTyList::nameHints);

// allow static registration of storage points
enum CommitMethod {
  AutoCommit = 0,
  ManualCommit
};

template <typename Name, CommitMethod cm>
struct StorageGroup {
  struct StmtData {
    tydescfn    tdesc;
    uint64_t    flags;
    std::string fmtstr;
    std::string file;
    uint32_t    line;
    uint32_t    id;
  };
  using StorageStatements = std::map<std::string, StmtData>;

  StorageStatements* statements;
  PipeQOS            qos;
  size_t             mempages;
  WaitPolicy         wp;
  bool               enabled;

  std::mutex                 mqmtx;
  int                        mqserver;
  static thread_local wpipe* pipe;
  static std::vector<ProcThread> pts;

  constexpr StorageGroup(size_t pagec, const PipeQOS qos) : StorageGroup(pagec, qos, Platform) {}
  constexpr StorageGroup(size_t pagec, const PipeQOS qos, const WaitPolicy wp)
    : statements(nullptr), qos(qos), mempages(pagec), wp(wp), enabled(true), mqserver(-1) {}

  ~StorageGroup() {
    delete this->statements;
    delete this->pipe;
  }

  void prepareMeta(bytes* meta) {
    if (!this->statements) return; // if nothing to record, nothing to prepare

    ty::w(HSTORE_VERSION, meta);
    ty::w(static_cast<int>(this->qos), meta);
    ty::w(static_cast<int>(cm), meta);

    // write the count of storage statements, then each statement's static data
    ty::w(static_cast<uint32_t>(this->statements->size()), meta);
    for (auto s : *this->statements) {
      ty::ws(s.first,         meta);
      ty::w (s.second.flags,  meta);
      ty::ws(s.second.fmtstr, meta);
      ty::ws(s.second.file,   meta);
      ty::w (s.second.line,   meta);
      ty::w (s.second.id,     meta);
      
      bytes td;
      s.second.tdesc(&td,0);
      ty::ws(td, meta);
    }
  }

  wpipe& out() {
    if (PRIV_HSTORE_LIKELY(this->pipe != 0)) {
      return *this->pipe;
    } else {
      std::lock_guard<std::mutex> mqguard(this->mqmtx);
      pts.push_back(thisProcThread());

      // allocate a shared memory queue for this group, register it with the group server
      bytes meta;
      prepareMeta(&meta);
      
      size_t pagesz = sysconf(_SC_PAGESIZE);
      size_t pagec  = 1 + (meta.size() / pagesz) + std::max<size_t>(this->mempages, 10);

      this->pipe =
        new wpipe(
          new writer(meta, sharedMemName(Name::str()), pagesz, pagec, wp),
          this->qos,
          /*if blocked 10ms*/ 10000000L,
          /*reconnect if needed*/
          [&](){
            std::lock_guard<std::mutex> mqguard(this->mqmtx);
            reconnectSHM(&this->mqserver, Name::str(), wp, pts);
          }
        );

      try {
        registerSHMAlloc(&this->mqserver, Name::str(), wp);
      } catch (...) {
        // unreliable mode can accept connection failures
        if (this->qos != Unreliable) {
          throw;
        }
      }
      return *this->pipe;
    }
  }

  inline void init() {
    out();
  }

  inline void commit() {
    out().commit();
  }

  inline void rollback() {
    out().rollback();
  }

  inline void enable() {
    this->enabled = true;
  }

  inline void disable() {
    this->enabled = false;
  }

  inline void reconnect() {
    std::lock_guard<std::mutex> mqguard(this->mqmtx);
    reconnectSHM(&this->mqserver, Name::str(), this->wp, pts);
  }

  // use the log statement name provided by the user (if applicable)
  // if the name is auto:N, then use either N or a name inferred from the type being logged
  // for auto-assigned names, append an incrementing counter if necessary to make the name unique
  static bool isAutoName(const std::string& x) {
    return x.substr(0, 5) == "auto:";
  }

  static std::string decideName(const std::string& x, tynamehints tnhints) {
    if (!isAutoName(x)) {
      return x;
    } else {
      std::string n;
      std::vector<std::string> ns;
      tnhints(&ns);
      if (ns.empty()) {
        n = x.substr(5);
      } else {
        std::ostringstream nss;
        nss << ns[0];
        for (size_t i = 1; i < ns.size(); ++i) {
          nss << "_" << ns[i];
        }
        n = nss.str();
      }
      return n;
    }
  }

  static std::string incrAutoName(const std::string& n) {
    std::ostringstream fnss;
    static std::map<std::string, std::atomic<size_t>*> ctrs;
    auto i = ctrs.find(n);
    if (i != ctrs.end()) {
      fnss << n << *i->second;
      ++*i->second;
    } else {
      fnss << n << "1";
      ctrs[n] = new std::atomic<size_t>(2);
    }
    return fnss.str();
  }

  uint32_t addStatement(const std::string& file, uint32_t line, const std::string& name, size_t flags, const std::string& fmtstr, tydescfn tdesc) {
    if (this->statements->find(name) != this->statements->end()) {
      std::cerr
        << "fatal error: duplicate log name in use (" << name << ")"
        << std::endl;

      exit(-1);
    }

    StmtData d;
    d.tdesc  = tdesc;
    d.flags  = flags;
    d.fmtstr = fmtstr;
    d.file   = file;
    d.line   = line;
    d.id     = static_cast<uint32_t>(this->statements->size());

    (*this->statements)[name] = d;
    return d.id;
  }

  uint32_t allocateStorageStatement(const std::string& file, uint32_t line, const std::string& namet, size_t flags, const std::string& fmtstr, tydescfn tdesc, tynamehints tnhints) {
    if (!this->statements) {
      this->statements = new StorageStatements();
    }
    
    auto name = decideName(namet, tnhints);
    auto s = this->statements->find(name);
    if (s != this->statements->end()) {
      // this statement has already been added
      // make sure that all identifying parameters are consistent
      bytes ety, xty;
      s->second.tdesc(&ety,0);
      tdesc(&xty,nullptr);
      if (ety != xty) {
        if (isAutoName(namet)) {
          return addStatement(file, line, incrAutoName(name), flags, fmtstr, tdesc);
        } else {
          std::string etd, xtd;
          s->second.tdesc(0,&etd);
          tdesc(nullptr,&xtd);

          std::cerr
            << "fatal error: incompatible types for store statement '" << name << "' between:\n"
            << "  " << s->second.file << ":" << s->second.line << " (" << xtd << ")\n"
            << "and\n"
            << "  " << file << ":" << line << " (" << etd << ")\n"
            << std::endl;

          exit(-1);
        }
      }

      if (s->second.fmtstr != fmtstr) {
        if (isAutoName(namet)) {
          return addStatement(file, line, incrAutoName(name), flags, fmtstr, tdesc);
        } else {
          std::cerr
            << "fatal error: incompatible format strings for store statement '" << name << "' between:\n"
            << "  " << s->second.file << ":" << s->second.line << " (" << s->second.fmtstr << ")\n"
            << "and\n"
            << "  " << file << ":" << line << " (" << fmtstr << ")\n"
            << std::endl;

          exit(-1);
        }
      }

      if (s->second.flags != flags) {
        if (isAutoName(namet)) {
          return addStatement(file, line, incrAutoName(name), flags, fmtstr, tdesc);
        } else {
          std::cerr
            << "fatal error: incompatible storage flags for store statement '" << name << "' between:\n"
            << "  " << s->second.file << ":" << s->second.line << " (" << s->second.flags << ")\n"
            << "and\n"
            << "  " << file << ":" << line << " (" << flags << ")\n"
            << std::endl;

          exit(-1);
        }
      }

      return s->second.id;
    }

    // this must be a new statement
    return addStatement(file, line, name, flags, fmtstr, tdesc);
  }
};
template <typename Name, CommitMethod cm>
  thread_local wpipe* StorageGroup<Name, cm>::pipe = nullptr;
template <typename Name, CommitMethod cm>
  std::vector<ProcThread> StorageGroup<Name, cm>::pts;

// write a storage statement with payload into a pipe
template <typename ... Ts>
  struct serialize_values {
    static size_t size(const Ts&...) { return 0; }
    static bool write(wpipe&, const Ts&...) { return true; }
  };
template <typename T, typename ... Ts>
  struct serialize_values<T, Ts...> {
    static size_t size(const T& x, const Ts&... xs) {
      return store<T>::size(x) + serialize_values<Ts...>::size(xs...);
    }
    static bool write(wpipe& p, const T& x, const Ts&... xs) {
      return store<T>::write(p, x) && serialize_values<Ts...>::write(p, xs...);
    }
  };

template <typename GName, typename ... Ts>
  inline bool write(StorageGroup<GName, AutoCommit>* g, uint32_t id, const Ts&... xs) {
    wpipe& p = g->out();
    if (!g->enabled) {
      return false;
    }
    if (!p.hasSpaceFor(sizeof(uint32_t) + serialize_values<Ts...>::size(xs...))) {
      g->commit();
    }
    return store<uint32_t>::write(p, id) &&
           serialize_values<Ts...>::write(p, xs...);
  }
template <typename GName, typename ... Ts>
  inline bool write(StorageGroup<GName, ManualCommit>* g, uint32_t id, const Ts&... xs) {
    wpipe& p = g->out();
    return g->enabled &&
           store<uint32_t>::write(p, id) &&
           serialize_values<Ts...>::write(p, xs...);
  }

// validate arity in log format strings (prevent format strings referring to payload variables that don't exist)
template <size_t N>
  static constexpr size_t readInt(const char (&fmt)[N], size_t i, size_t e, size_t n) {
    return (i == e) ? n : readInt(fmt, i+1, e, (n*10)+(fmt[i]-'0'));
  }

static constexpr size_t maxV(size_t x, size_t y) {
  return (x < y) ? y : x;
}

template <size_t N>
  static constexpr size_t maxVarRefS(const char (&fmt)[N], size_t i, size_t s, size_t vri, size_t maxvr) {
    return  (i >= N) ? maxvr
           :(s == 0) ? ((fmt[i] == '\\') ? maxVarRefS(fmt, i+2, 0, 0, maxvr)
                       :(fmt[i] == '$')  ? maxVarRefS(fmt, i+1, 1, i+1, maxvr)
                       :                   maxVarRefS(fmt, i+1, 0, 0,   maxvr))
           :           ((fmt[i] >= '0' && fmt[i] <= '9') ?
                           maxVarRefS(fmt, i+1, 1, vri, maxvr)
                         : maxVarRefS(fmt, i,   0, 0,   maxV(maxvr, 1+readInt(fmt, vri, i, 0))));
  }

template <size_t N>
  static constexpr size_t maxVarRef(const char (&fmt)[N]) {
    return maxVarRefS(fmt, 0, 0, 0, 0);
  }

// create statement groups
#define DECLARE_STORAGE_GROUP(NAME, cm)                     extern ::hobbes::storage::StorageGroup<PRIV_HPPF_TSTR(#NAME),cm> NAME
#define DEFINE_STORAGE_GROUP(NAME, pagec, qos, cm, ARGS...) ::hobbes::storage::StorageGroup<PRIV_HPPF_TSTR(#NAME),cm> NAME(pagec, qos, ## ARGS)

// record some data
#define APPLY_HSTORE_STMT(GROUP, NAME, FLAGS, FMTSTR, ARGS...) \
  ::hobbes::storage::write(&GROUP, ({static_assert(::hobbes::storage::maxVarRef(FMTSTR) <= decltype(::hobbes::storage::makePayloadTypes(ARGS))::count, "Log format string and payload arity mismatch"); ::hobbes::storage::StorageStatement<decltype(GROUP),&GROUP,PRIV_HPPF_TSTR(__FILE__),__LINE__,PRIV_HPPF_TSTR(#NAME),FLAGS,PRIV_HPPF_TSTR(FMTSTR),decltype(::hobbes::storage::makePayloadTypes(ARGS))>::id;}), ## ARGS)

#define HSTORE(GROUP, NAME, ARGS...)       APPLY_HSTORE_STMT(GROUP, NAME, 0,     "", ## ARGS)
#define HLOG(GROUP, NAME, FMTSTR, ARGS...) APPLY_HSTORE_STMT(GROUP, NAME, 1, FMTSTR, ## ARGS)

// run a process to read transaction data
struct statement {
  std::string name;
  uint64_t    flags;
  std::string fmtstr;
  std::string file;
  uint32_t    line;
  uint32_t    id;
  bytes       type;

  inline bool isLog() const { return (this->flags & 1) == 1; }

  inline bool operator==(const statement& rhs) const {
    return this->name   == rhs.name   &&
           this->flags  == rhs.flags  &&
           this->fmtstr == rhs.fmtstr &&
           this->file   == rhs.file   &&
           this->line   == rhs.line   &&
           this->id     == rhs.id     &&
           this->type   == rhs.type;
  }
};
using statements = std::vector<statement>;

inline size_t rs(const reader::MetaData& md, size_t o, size_t n, uint8_t* b) {
  if (o + n <= md.second) {
    memcpy(b, md.first + o, n);
    return o + n;
  } else {
    return md.second;
  }
}

template <typename T>
  size_t r(const reader::MetaData& md, size_t o, T* t) {
    return rs(md, o, sizeof(T), reinterpret_cast<uint8_t*>(t));
  }

inline size_t rs(const reader::MetaData& md, size_t o, std::string* s) {
  size_t n = 0;
  o = r(md, o, &n);
  s->resize(n);
  return rs(md, o, n, reinterpret_cast<uint8_t*>(&(*s)[0]));
}

inline size_t rs(const reader::MetaData& md, size_t o, bytes* s) {
  size_t n = 0;
  o = r(md, o, &n);
  s->resize(n);
  return rs(md, o, n, &(*s)[0]);
}

class Transaction {
public:
  // a persistent transaction to retain log data between consumer sessions (if necessary)
  Transaction(std::string fname) : fd(-1), file_size(0), map_size(0), data(nullptr), data_size(nullptr), wi(0), ri(0) {
    for (char &i : fname) {
      switch (i) {
      case '/': case ':': case '[': case ']': i='_'; break;
      default: break;
      }
    }
    fname = "." + fname + ".txn";
    this->fd = ::open(fname.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);

    if (this->fd < 0) {
      throw std::runtime_error("Unable to open transaction persistence file: " + fname + " (" + std::string(strerror(errno)) + ")");
    }

    struct stat sb;
    if (::fstat(this->fd, &sb) < 0) {
      close(this->fd);
      throw std::runtime_error("Failed to stat transaction persistence file: " + fname + " (" + std::string(strerror(errno)) + ")");
    }
    truncMapFile(sb.st_size == 0 ? /*1MB*/1048576 : sb.st_size);
  }

  // a non-persistent transaction over constant data (just used by remote consumers)
  Transaction(const uint8_t* d, size_t len) : fd(-1), file_size(len), map_size(len), data(const_cast<uint8_t*>(d)), data_size(nullptr), wi(0), ri(0) {
    this->data_size = &this->map_size;
  }

  bool canRead(size_t x) const {
    return (this->ri+x) <= *this->data_size;
  }

  const uint8_t* ptr() const {
    return this->data + this->ri;
  }

  void skip(size_t d) {
    this->ri += d;
  }

  template <typename T>
    const T* read() {
      const auto *p = reinterpret_cast<const T*>(ptr());
      skip(sizeof(T));
      return p;
    }

  size_t size() const {
    return *this->data_size;
  }

  void clear() {
    *this->data_size = 0;
    this->wi = 0;
    this->ri = 0;
  }

  bool readToCompletion(rpipe& p, size_t timeoutNS, const std::function<void()>& timeoutF) {
    static const size_t blockSize = 1024;

    if (this->wi + blockSize >= this->file_size) {
      truncMapFile(this->file_size + /*1MB*/1048576);
    }

    uint8_t txnFlag = 0;
    this->wi += p.read(this->data + this->wi, blockSize, &txnFlag, timeoutNS, timeoutF);
    *this->data_size = this->wi;

    switch (txnFlag) {
    case PRIV_HSTORE_PAGE_STATE_ROLLBACK:
      clear();
      return false;
    case PRIV_HSTORE_PAGE_STATE_COMMIT: {
      return true;
    }
    default:
      return false;
    }
  }
private:
  int    fd;
  size_t file_size;
  size_t map_size;

  uint8_t* data;
  size_t*  data_size;
  size_t   wi;
  size_t   ri;

  void truncMapFile(size_t sz) {
    off_t dsz = sz - this->file_size;
    int r = ::posix_fallocate(this->fd, this->file_size, dsz);
    if (r != 0) {
      if (r == ENOSPC) {
        throw std::runtime_error("Failed to expand transaction persistence file (no space available)");
      } else {
        throw std::runtime_error("Failed to expand transaction persistence file");
      }
    }
    this->file_size = sz;

    if (sz > this->map_size) {
      if (this->data != nullptr) { ::munmap(this->data-sizeof(size_t), this->map_size); }

      this->map_size = align<size_t>(sz, /*10MB*/10485760);
      this->data = reinterpret_cast<uint8_t*>(mmap(nullptr, this->map_size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, this->fd, 0));
      if (this->data == MAP_FAILED) {
        throw std::runtime_error("Failed to map transaction persistence file");
      }
      this->data_size = reinterpret_cast<size_t*>(this->data);
      this->data      += sizeof(size_t);
    }
  }
};

[[noreturn]] inline void runReadProcessWithTimeout(const QueueConnection& qc, const WaitPolicy wp, const std::function<std::function<void(Transaction&)>(PipeQOS, CommitMethod, const statements&)>& initF, size_t timeoutNS, const std::function<void(const reader&)>& userTimeoutF) {
  Transaction txn(qc.shmname);
  reader      rd(qc, wp);
  rpipe       p(&rd);

  // initialize
  reader::MetaData md = rd.meta();

  uint32_t hstoreVersion = 0;
  size_t o = r(md, 0, &hstoreVersion);
  if (hstoreVersion != HSTORE_VERSION) {
    throw std::runtime_error("Can't read storage data from incompatible process");
  }

  // read group flags/settings
  int qos=0, cm=0;
  o = r(md, o, &qos);
  o = r(md, o, &cm);

  // read all storage statements
  uint32_t n = 0;
  o = r(md, o, &n);
  statements ss;
  ss.reserve(n);
  for (size_t i = 0; i < n; ++i) {
    statement s;
    o = rs(md, o, &s.name);
    o = r (md, o, &s.flags);
    o = rs(md, o, &s.fmtstr);
    o = rs(md, o, &s.file);
    o = r (md, o, &s.line);
    o = r (md, o, &s.id);
    o = rs(md, o, &s.type);
    ss.push_back(s);
  }

  auto txnF = initF(static_cast<PipeQOS>(qos), static_cast<CommitMethod>(cm), ss);

  auto timeoutF = [&rd, userTimeoutF]() {
    userTimeoutF(rd);
  };

  // read transactions and call back into user code
  while (true) {
    if (txn.readToCompletion(p, timeoutNS, timeoutF)) {
      txnF(txn);
      txn.clear();
    }
  }
}

[[noreturn]] inline void runReadProcess(const QueueConnection& qc, const WaitPolicy wp, const std::function<std::function<void(Transaction&)>(PipeQOS, CommitMethod, const statements&)>& initF) {
  runReadProcessWithTimeout(qc, wp, initF, 0, [](const reader&){});
}

}}

#endif
